package org.framework.lazy.cloud.network.heartbeat.server.cluster.application.impl;

import lombok.extern.slf4j.Slf4j;
import org.framework.lazy.cloud.network.heartbeat.client.netty.permeate.event.ClientChangeEvent;
import org.framework.lazy.cloud.network.heartbeat.client.netty.permeate.tcp.socket.NettyTcpClientSocket;
import org.framework.lazy.cloud.network.heartbeat.common.advanced.HandleChannelTypeAdvanced;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.dto.LazyNettyClusterNodeDTO;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.domain.model.cluster.node.LazyNettyClusterNode;
import org.framework.lazy.cloud.network.heartbeat.server.properties.ServerNodeProperties;
import org.wu.framework.lazy.orm.web.plus.stereotype.LazyApplication;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.LazyNettyClusterNodeApplication;
import org.wu.framework.web.response.Result;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.command.lazy.netty.cluster.node.LazyNettyClusterNodeRemoveCommand;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.command.lazy.netty.cluster.node.LazyNettyClusterNodeStoryCommand;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.command.lazy.netty.cluster.node.LazyNettyClusterNodeUpdateCommand;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.command.lazy.netty.cluster.node.LazyNettyClusterNodeQueryListCommand;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.command.lazy.netty.cluster.node.LazyNettyClusterNodeQueryOneCommand;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.application.assembler.LazyNettyClusterNodeDTOAssembler;

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import jakarta.annotation.Resource;
import org.framework.lazy.cloud.network.heartbeat.server.cluster.domain.model.cluster.node.LazyNettyClusterNodeRepository;

import java.util.List;

import org.wu.framework.lazy.orm.database.lambda.domain.LazyPage;

/**
 * describe 集群配置信息
 *
 * @author Jia wei Wu
 * @date 2024/04/12 02:16 下午
 * @see org.wu.framework.lazy.orm.core.persistence.reverse.lazy.ddd.DefaultDDDLazyApplicationImpl
 **/
@Slf4j
@LazyApplication
public class LazyNettyClusterNodeApplicationImpl implements LazyNettyClusterNodeApplication {

    @Resource
    LazyNettyClusterNodeRepository lazyNettyClusterNodeRepository;

    @Resource
    ClientChangeEvent clientChangeEvent;

    @Resource
    List<HandleChannelTypeAdvanced> handleChannelTypeAdvancedList; // 处理服务端发送过来的数据类型

    @Resource
    ServerNodeProperties serverNodeProperties;

    // 缓存连接集群 socket
    private final ConcurrentHashMap<LazyNettyClusterNode, NettyTcpClientSocket> cacheClusterNettyClientSocketMap = new ConcurrentHashMap<>();

    public static final ThreadPoolExecutor NETTY_CLUSTER_CLIENT_EXECUTOR =
            new ThreadPoolExecutor(20, 50, 200, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1));

    /**
     * describe 新增集群配置信息
     *
     * @param lazyNettyClusterNodeStoryCommand 新增集群配置信息
     * @return {@link Result<LazyNettyClusterNode>} 集群配置信息新增后领域对象
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<LazyNettyClusterNode> story(LazyNettyClusterNodeStoryCommand lazyNettyClusterNodeStoryCommand) {
        LazyNettyClusterNode lazyNettyClusterNode = LazyNettyClusterNodeDTOAssembler.INSTANCE.toLazyNettyClusterNode(lazyNettyClusterNodeStoryCommand);
        return lazyNettyClusterNodeRepository.story(lazyNettyClusterNode);
    }

    /**
     * describe 批量新增集群配置信息
     *
     * @param lazyNettyClusterNodeStoryCommandList 批量新增集群配置信息
     * @return {@link Result<List<LazyNettyClusterNode>>} 集群配置信息新增后领域对象集合
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<List<LazyNettyClusterNode>> batchStory(List<LazyNettyClusterNodeStoryCommand> lazyNettyClusterNodeStoryCommandList) {
        List<LazyNettyClusterNode> lazyNettyClusterNodeList = lazyNettyClusterNodeStoryCommandList.stream().map(LazyNettyClusterNodeDTOAssembler.INSTANCE::toLazyNettyClusterNode).collect(Collectors.toList());
        return lazyNettyClusterNodeRepository.batchStory(lazyNettyClusterNodeList);
    }

    /**
     * describe 更新集群配置信息
     *
     * @param lazyNettyClusterNodeUpdateCommand 更新集群配置信息
     * @return {@link Result<LazyNettyClusterNode>} 集群配置信息领域对象
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<LazyNettyClusterNode> updateOne(LazyNettyClusterNodeUpdateCommand lazyNettyClusterNodeUpdateCommand) {
        LazyNettyClusterNode lazyNettyClusterNode = LazyNettyClusterNodeDTOAssembler.INSTANCE.toLazyNettyClusterNode(lazyNettyClusterNodeUpdateCommand);
        return lazyNettyClusterNodeRepository.story(lazyNettyClusterNode);
    }

    /**
     * describe 查询单个集群配置信息
     *
     * @param lazyNettyClusterNodeQueryOneCommand 查询单个集群配置信息
     * @return {@link Result< LazyNettyClusterNodeDTO >} 集群配置信息DTO对象
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<LazyNettyClusterNodeDTO> findOne(LazyNettyClusterNodeQueryOneCommand lazyNettyClusterNodeQueryOneCommand) {
        LazyNettyClusterNode lazyNettyClusterNode = LazyNettyClusterNodeDTOAssembler.INSTANCE.toLazyNettyClusterNode(lazyNettyClusterNodeQueryOneCommand);
        return lazyNettyClusterNodeRepository.findOne(lazyNettyClusterNode).convert(LazyNettyClusterNodeDTOAssembler.INSTANCE::fromLazyNettyClusterNode);
    }

    /**
     * describe 查询多个集群配置信息
     *
     * @param lazyNettyClusterNodeQueryListCommand 查询多个集群配置信息
     * @return {@link Result<List<LazyNettyClusterNodeDTO>>} 集群配置信息DTO对象
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<List<LazyNettyClusterNodeDTO>> findList(LazyNettyClusterNodeQueryListCommand lazyNettyClusterNodeQueryListCommand) {
        LazyNettyClusterNode lazyNettyClusterNode = LazyNettyClusterNodeDTOAssembler.INSTANCE.toLazyNettyClusterNode(lazyNettyClusterNodeQueryListCommand);
        return lazyNettyClusterNodeRepository.findList(lazyNettyClusterNode).convert(lazyNettyClusterNodes -> lazyNettyClusterNodes.stream().map(LazyNettyClusterNodeDTOAssembler.INSTANCE::fromLazyNettyClusterNode).collect(Collectors.toList()));
    }

    /**
     * describe 分页查询多个集群配置信息
     *
     * @param lazyNettyClusterNodeQueryListCommand 分页查询多个集群配置信息
     * @return {@link Result<LazyPage<LazyNettyClusterNodeDTO>>} 分页集群配置信息DTO对象
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<LazyPage<LazyNettyClusterNodeDTO>> findPage(int size, int current, LazyNettyClusterNodeQueryListCommand lazyNettyClusterNodeQueryListCommand) {
        LazyNettyClusterNode lazyNettyClusterNode = LazyNettyClusterNodeDTOAssembler.INSTANCE.toLazyNettyClusterNode(lazyNettyClusterNodeQueryListCommand);
        return lazyNettyClusterNodeRepository.findPage(size, current, lazyNettyClusterNode).convert(page -> page.convert(LazyNettyClusterNodeDTOAssembler.INSTANCE::fromLazyNettyClusterNode));
    }

    /**
     * describe 删除集群配置信息
     *
     * @param lazyNettyClusterNodeRemoveCommand 删除集群配置信息
     * @return {@link Result<LazyNettyClusterNode>} 集群配置信息
     * @author Jia wei Wu
     * @date 2024/04/12 02:16 下午
     **/

    @Override
    public Result<LazyNettyClusterNode> remove(LazyNettyClusterNodeRemoveCommand lazyNettyClusterNodeRemoveCommand) {
        LazyNettyClusterNode lazyNettyClusterNode = LazyNettyClusterNodeDTOAssembler.INSTANCE.toLazyNettyClusterNode(lazyNettyClusterNodeRemoveCommand);
        return lazyNettyClusterNodeRepository.remove(lazyNettyClusterNode);
    }

    /**
     * 启动 集群节点
     *
     * @param lazyNettyClusterNode 配置
     */
    @Override
    public void starterOneClusterNode(LazyNettyClusterNode lazyNettyClusterNode) {

        String inetHost = lazyNettyClusterNode.getClusterNodeHost();
        Integer inetPort = lazyNettyClusterNode.getClusterNodePort();
        String clusterNodeId = lazyNettyClusterNode.getClusterNodeId();
        // 当前节点ID
        String clusterNodeClientId = serverNodeProperties.getNodeId();

        NettyTcpClientSocket nettyTcpClientSocket = new
                NettyTcpClientSocket(inetHost, inetPort, clusterNodeClientId,
                clusterNodeId,null,null,
                clientChangeEvent, handleChannelTypeAdvancedList);
        // 过滤已经存在的
        boolean anyMatch = cacheClusterNettyClientSocketMap
                .keySet().stream()
                .anyMatch(cacheClusterLazyNettyClusterNode -> clusterNodeId.equals(cacheClusterLazyNettyClusterNode.getClusterNodeId()));
        if (anyMatch) {
            log.warn("当前节点注册:{} 已经存在", lazyNettyClusterNode);
            return;
        }
        cacheClusterNettyClientSocketMap.put(lazyNettyClusterNode, nettyTcpClientSocket);

        Thread thread = new Thread(() -> {
            try {
                nettyTcpClientSocket.newConnect2Server();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

        });
        // 当前服务连接Netty客户端:{},Netty端口:{}
        log.info("Current service connection Netty client: {}, Netty port: {}", inetHost, inetPort);
        NETTY_CLUSTER_CLIENT_EXECUTOR.execute(thread);


    }

    /**
     * 启动集群所有节点
     */
    @Override
    public void starterClusterNodes() {
        // 查询所有配置
        LazyNettyClusterNode queryLazyNettyClusterNode = new LazyNettyClusterNode();
        queryLazyNettyClusterNode.setIsDeleted(false);

        lazyNettyClusterNodeRepository.findList(queryLazyNettyClusterNode).accept(lazyNettyClusterNodeList -> {

            for (LazyNettyClusterNode lazyNettyClusterNode : lazyNettyClusterNodeList) {
                starterOneClusterNode(lazyNettyClusterNode);
            }
        });

    }

    /**
     * 关闭 集群节点
     *
     * @param needCloseLazyNettyClusterNode 配置
     */
    @Override
    public void destroyOneClusterNode(LazyNettyClusterNode needCloseLazyNettyClusterNode) {
        // 当前节点ID
        String clusterNodeId = serverNodeProperties.getNodeId();
        // 关闭指定socket
        cacheClusterNettyClientSocketMap.forEach(((lazyNettyClusterNode, nettyTcpClientSocket) -> {
            String inetHost = lazyNettyClusterNode.getClusterNodeHost();
            Integer inetPort = lazyNettyClusterNode.getClusterNodePort();
            String needCloseInetHost = needCloseLazyNettyClusterNode.getClusterNodeHost();
            Integer needCloseInetPort = needCloseLazyNettyClusterNode.getClusterNodePort();
            String needCloseClientId = needCloseLazyNettyClusterNode.getClusterNodeId();
            if (Objects.equals(clusterNodeId, needCloseClientId)
                    && Objects.equals(inetPort, needCloseInetPort)
                    && Objects.equals(inetHost, needCloseInetHost)) {
                nettyTcpClientSocket.shutdown();
                // 关闭客户端:{}与服务端连接:{}:{}
                log.warn("Close client: {} Connect to server: {}: {}", clusterNodeId, inetHost, inetPort);
            }
        }));
    }

    /**
     * 关闭 集群上所有节点
     */
    @Override
    public void destroyClusterNodes() {
        // 关闭socket
        cacheClusterNettyClientSocketMap.forEach(((lazyNettyClusterNode, nettyTcpClientSocket) -> {
            nettyTcpClientSocket.shutdown();
            String clientId = lazyNettyClusterNode.getClusterNodeId();
            String inetHost = lazyNettyClusterNode.getClusterNodeHost();
            Integer inetPort = lazyNettyClusterNode.getClusterNodePort();
            // 关闭客户端:{}与服务端连接:{}:{}
            log.warn("Close client: {} Connect to server: {}: {}", clientId, inetHost, inetPort);
        }));
    }

}